package cn.com.tranzvision.kafka;

import cn.com.tranzvision.bean.CanalRowData;
import cn.com.tranzvision.util.ConfigUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * kafka生产工具类
 * @author zwl
 * @date : 2021/7/19 16:17
 */
public class KafkaSender {
    private Properties kafkaProps = new Properties();
    private KafkaProducer<String, CanalRowData> kafkaProducer;

    //初始化kafka的生产者对象
    public KafkaSender(){
        //kafka集群地址
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigUtil.kafkaBootstrap_servers_config());
        //批次发送数据的大小，满足批次大小发送数据
        kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, ConfigUtil.kafkaBatch_size_config());
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, ConfigUtil.kafkaAcks());
        //kafka重试次数
        kafkaProps.put(ProducerConfig.RETRIES_CONFIG, ConfigUtil.kafkaRetries());
        //kafka的key的序列化
        kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigUtil.kafkaClient_id_config());
        //value序列化
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaKey_serializer_class_config());
        //数据写入到kafka的哪个topic中
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaValue_serializer_class_config());

        //实例化生产者对象
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    /**
     * 传递参数，将数据写入到kafka集群
     * @param rowData
     */
    public void send(CanalRowData rowData){
        kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), rowData));
    }

}
